package drds.plus.sql_process.optimizer.execute_plan_optimizer;

import drds.plus.common.jdbc.Parameters;
import drds.plus.common.properties.ConnectionParams;
import drds.plus.common.properties.ConnectionProperties;
import drds.plus.common.properties.ParametersManager;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.*;
import drds.plus.sql_process.utils.OptimizerUtils;
import drds.tools.$;

import java.util.Map;

/**
 * 判断merge的执行模式：串行，库间并行，全表并行
 *
 * <pre>
 * 1. 无聚合函数，无条件  串行
 * 2. 不满足1
 *     a. 存在orderBy(包括因为groupBy/distinct/sortMergeJoin的join列下推的orderby)， 生成全表并行
 *     b. 存在group by或者distinct为所有分库键，并且不存在其余order by(先做下推)，生成group并行
 *     c. 存在groupBy(已经排除了a的情况)，生成全表并行
 *     d. 其余case，生成group并行
 *
 * 全表并行优化：如果开启了临时表选择模式，则优先使用group并行来控制连接数的使用
 * </pre>
 */
public class MergeQueryConcurrencyWayOptimizer implements ExecutePlanOptimizer {

    public MergeQueryConcurrencyWayOptimizer() {
    }

    /**
     * 如果设置了MergeConcurrent 并且值为True，则将所有的Merge变为并行
     */

    public ExecutePlan optimize(ExecutePlan executePlan, Parameters parameters, Map<String, Object> extraCmd) {
        this.setQueryConcurrency(executePlan, extraCmd);
        return executePlan;
    }

    private void setQueryConcurrency(ExecutePlan executePlan, Map<String, Object> extraCmd) {
        if (executePlan instanceof MergeQuery) {
            QueryConcurrencyWay queryConcurrencyWay = judgeQueryConcurrencyWay(extraCmd, (MergeQuery) executePlan);
            ((MergeQuery) executePlan).setQueryConcurrency(queryConcurrencyWay);

            for (ExecutePlan subExecutePlan : ((MergeQuery) executePlan).getExecutePlanList()) {
                this.setQueryConcurrency(subExecutePlan, extraCmd);
            }
        }

        if (executePlan instanceof Join) {
            this.setQueryConcurrency(((Join) executePlan).getLeftNode(), extraCmd);
            this.setQueryConcurrency(((Join) executePlan).getRightNode(), extraCmd);
        }

        if (executePlan instanceof QueryWithIndex && ((QueryWithIndex) executePlan).getSubQuery() != null) {
            this.setQueryConcurrency(((QueryWithIndex) executePlan).getSubQuery(), extraCmd);

        }
    }

    /**
     * <pre>
     * 1. 无聚合函数，无条件  串行
     * 2. 不满足1
     *     a. 存在orderBy(包括因为groupBy/distinct/sortMergeJoin的join列下推的order by)， 生成全部并行
     *     b. 存在group by或者distinct为所有分库键，并且不存在其余order by(先做下推)，生成group并行
     *     c. 存在groupBy(已经排除了a的情况)，生成全部并行
     *     d. 其余case，生成group并行
     * </pre>
     */
    private QueryConcurrencyWay judgeQueryConcurrencyWay(Map<String, Object> extraCmd, MergeQuery mergeQuery) {
        String mergeConcurrentValue = (String) extraCmd.get(ConnectionProperties.merge_concurrent);
        boolean chooseTemporaryTable = chooseTemporaryTable(extraCmd);
        if ($.isNullOrEmpty(mergeConcurrentValue)) {
            if (mergeQuery.getSql() != null) {
                // 如果存在sql，那说明是hint直接路由
                return QueryConcurrencyWay.data_node_concurrent;
            } else if (!mergeQuery.isExistAggregate()) {
                // [无order by group by 那么子节点就不是有序返回 所以必须串行]
                // 如果不存在聚合计算
                if ((mergeQuery.getLimitFrom() != null || mergeQuery.getLimitTo() != null)) {
                    if (!$.isNotNullAndHasElement(mergeQuery.getOrderByList())) {
                        // 存在limit，但不存在order by时不允许走并行
                        setLayLoad(mergeQuery, true);
                        return QueryConcurrencyWay.sequential;
                    }
                } else if ((!$.isNotNullAndHasElement(mergeQuery.getOrderByList())) && (!$.isNotNullAndHasElement(mergeQuery.getGroupByList())) && mergeQuery.getHaving() == null) {
                    if (OptimizerUtils.isNoFilter(mergeQuery)) {
                        // 不存在聚合函数
                        // 没有其他的order by / group by / having /
                        // where等条件时，就是个简单的select *
                        // from xxx，暂时也不做并行
                        setLayLoad(mergeQuery, true);
                        return QueryConcurrencyWay.sequential;
                    }
                }
                ///
                if (chooseTemporaryTable) {
                    // 开启临时表模式，使用库并行模式
                    return QueryConcurrencyWay.data_node_concurrent;
                }
                // 其余情况，选择group并行
                if ($.isNotNullAndHasElement(mergeQuery.getOrderByList())) {
                    return QueryConcurrencyWay.concurrent;
                } else {
                    return QueryConcurrencyWay.data_node_concurrent;// 从最min的角度考虑
                }
            } else {// 存在聚合计算...的情况讨论,优先考虑临时表，再一般情况
                if (chooseTemporaryTable) {
                    // 开启临时表模式，使用库并行模式
                    return QueryConcurrencyWay.data_node_concurrent;
                }
                // 存在聚合计算,最低库间并行
                if ($.isNotNullAndHasElement(mergeQuery.getOrderByList())) {
                    // case a. 存在order by
                    return QueryConcurrencyWay.concurrent;
                } else if (mergeQuery.isGroupByShardColumns() || mergeQuery.isDistinctGroupByShardColumns()) {
                    // case b
                    return QueryConcurrencyWay.data_node_concurrent;
                } else if ($.isNotNullAndHasElement(mergeQuery.getGroupByList())) {
                    // case c
                    return QueryConcurrencyWay.concurrent;
                } else {
                    // case d
                    return QueryConcurrencyWay.data_node_concurrent;
                }
            }
        } else {
            if (Boolean.parseBoolean(mergeConcurrentValue)) {
                if (chooseTemporaryTable) {
                    // 开启临时表模式，使用库并行模式
                    return QueryConcurrencyWay.data_node_concurrent;
                } else {
                    return QueryConcurrencyWay.concurrent;
                }
            } else {
                return QueryConcurrencyWay.sequential;// 从最min的角度考虑
            }
        }
    }

    private void setLayLoad(ExecutePlan executePlan, boolean lazyLoad) {
        if (executePlan instanceof Query) {
            executePlan.setLazyLoad(lazyLoad);
            if (executePlan instanceof MergeQuery) {
                for (ExecutePlan subExecutePlan : ((MergeQuery) executePlan).getExecutePlanList()) {
                    setLayLoad(subExecutePlan, lazyLoad);
                }
            }

            if (executePlan instanceof Join) {
                this.setLayLoad(((Join) executePlan).getLeftNode(), lazyLoad);
                this.setLayLoad(((Join) executePlan).getRightNode(), lazyLoad);
            }

            if (executePlan instanceof QueryWithIndex && ((QueryWithIndex) executePlan).getSubQuery() != null) {
                this.setLayLoad(((QueryWithIndex) executePlan).getSubQuery(), lazyLoad);
            }
        } else {
            executePlan.setLazyLoad(lazyLoad);
        }
    }

    private boolean chooseTemporaryTable(Map<String, Object> extraCmds) {
        if (extraCmds == null) {
            return false;
        }

        ParametersManager parametersManager = new ParametersManager(extraCmds);
        return parametersManager.getBoolean(ConnectionParams.ALLOW_TEMPORARY_TABLE) && parametersManager.getBoolean(ConnectionParams.CHOOSE_TEMPORARY_TABLE);
    }
}
